{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Multi-Agent Workflow with Weaviate QueryAgent\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a href=\"https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/agent/multi_agent_workflow_with_weaviate_queryagent.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this example, we will be building a LlamaIndex Agent Workflow that ends up being a multi-agent system that aims to be a Docs Assistant capable of:\n",
    "- Writing new content to a \"LlamaIndexDocs\" collection in Weaviate\n",
    "- Writing new content to a \"WeaviateDocs\" collection in Weaviate\n",
    "- Using the Weaviate [`QueryAgent`](https://weaviate.io/developers/agents/query) to answer questions based on the contents of these collections.\n",
    "\n",
    "The `QueryAgent` is a full agent prodcut by Weaviate, that is capable of doing regular search, as well as aggregations over the collections you give it access to. Our 'orchestrator' agent will decide when to invoke the Weaviate QueryAgent, leaving the job of creating Weaviate specific search queries to it.\n",
    "\n",
    "**Things you will need:**\n",
    "\n",
    "- An OpenAI API key (or switch to another provider and adjust the code below)\n",
    "- A Weaviate sandbox (this is free)\n",
    "- Your Weaviate sandbox URL and API key\n",
    "\n",
    "![Workflow Overview](../../../_static/agents/workflow-weaviate-multiagent.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Install & Import Dependencies"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install llama-index-core llama-index-utils-workflow weaviate-client[agents] llama-index-llms-openai llama-index-readers-web"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.core.workflow import (\n",
    "    StartEvent,\n",
    "    StopEvent,\n",
    "    Workflow,\n",
    "    step,\n",
    "    Event,\n",
    "    Context,\n",
    ")\n",
    "from llama_index.utils.workflow import draw_all_possible_flows\n",
    "from llama_index.readers.web import SimpleWebPageReader\n",
    "from llama_index.core.llms import ChatMessage\n",
    "from llama_index.core.tools import FunctionTool\n",
    "from llama_index.llms.openai import OpenAI\n",
    "from llama_index.core.agent.workflow import FunctionAgent\n",
    "\n",
    "from enum import Enum\n",
    "from pydantic import BaseModel, Field\n",
    "from llama_index.llms.openai import OpenAI\n",
    "from typing import List, Union\n",
    "import json\n",
    "\n",
    "import weaviate\n",
    "from weaviate.auth import Auth\n",
    "from weaviate.agents.query import QueryAgent\n",
    "from weaviate.classes.config import Configure, Property, DataType\n",
    "\n",
    "import os\n",
    "from getpass import getpass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Set up Weaviate\n",
    "\n",
    "To use the Weaviate Query Agent, first, create a [Weaviate Cloud](https://weaviate.io/deployment/serverless) account👇\n",
    "1. [Create Serverless Weaviate Cloud account](https://weaviate.io/deployment/serverless) and set up a free [Sandbox](https://weaviate.io/developers/wcs/manage-clusters/create#sandbox-clusters)\n",
    "2. Go to 'Embedding' and enable it, by default, this will make it so that we use `Snowflake/snowflake-arctic-embed-l-v2.0` as the embedding model\n",
    "3. Take note of the `WEAVIATE_URL` and `WEAVIATE_API_KEY` to connect to your cluster below\n",
    "\n",
    "> Info: We recommend using [Weaviate Embeddings](https://weaviate.io/developers/weaviate/model-providers/weaviate) so you do not have to provide any extra keys for external embedding providers."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if \"WEAVIATE_API_KEY\" not in os.environ:\n",
    "    os.environ[\"WEAVIATE_API_KEY\"] = getpass(\"Add Weaviate API Key\")\n",
    "if \"WEAVIATE_URL\" not in os.environ:\n",
    "    os.environ[\"WEAVIATE_URL\"] = getpass(\"Add Weaviate URL\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = weaviate.connect_to_weaviate_cloud(\n",
    "    cluster_url=os.environ.get(\"WEAVIATE_URL\"),\n",
    "    auth_credentials=Auth.api_key(os.environ.get(\"WEAVIATE_API_KEY\")),\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create WeaviateDocs and LlamaIndexDocs Collections\n",
    "\n",
    "The helper function below will create a \"WeaviateDocs\" and \"LlamaIndexDocs\" collection in Weaviate (if they don't exist already). It will also set up a `QueryAgent` that has access to both of these collections.\n",
    "\n",
    "The Weaviate [`QueryAgent`](https://weaviate.io/blog/query-agent) is designed to be able to query Weviate Collections for both regular search and aggregations, and also handles the burden of creating the Weaviate specific queries internally.\n",
    "\n",
    "The Agent will use the collection descriptions, as well as the property descriptions while formilating the queries."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def fresh_setup_weaviate(client):\n",
    "    if client.collections.exists(\"WeaviateDocs\"):\n",
    "        client.collections.delete(\"WeaviateDocs\")\n",
    "    client.collections.create(\n",
    "        \"WeaviateDocs\",\n",
    "        description=\"A dataset with the contents of Weaviate technical Docs and website\",\n",
    "        vectorizer_config=Configure.Vectorizer.text2vec_weaviate(),\n",
    "        properties=[\n",
    "            Property(\n",
    "                name=\"url\",\n",
    "                data_type=DataType.TEXT,\n",
    "                description=\"the source URL of the webpage\",\n",
    "            ),\n",
    "            Property(\n",
    "                name=\"text\",\n",
    "                data_type=DataType.TEXT,\n",
    "                description=\"the content of the webpage\",\n",
    "            ),\n",
    "        ],\n",
    "    )\n",
    "\n",
    "    if client.collections.exists(\"LlamaIndexDocs\"):\n",
    "        client.collections.delete(\"LlamaIndexDocs\")\n",
    "    client.collections.create(\n",
    "        \"LlamaIndexDocs\",\n",
    "        description=\"A dataset with the contents of LlamaIndex technical Docs and website\",\n",
    "        vectorizer_config=Configure.Vectorizer.text2vec_weaviate(),\n",
    "        properties=[\n",
    "            Property(\n",
    "                name=\"url\",\n",
    "                data_type=DataType.TEXT,\n",
    "                description=\"the source URL of the webpage\",\n",
    "            ),\n",
    "            Property(\n",
    "                name=\"text\",\n",
    "                data_type=DataType.TEXT,\n",
    "                description=\"the content of the webpage\",\n",
    "            ),\n",
    "        ],\n",
    "    )\n",
    "\n",
    "    agent = QueryAgent(\n",
    "        client=client, collections=[\"LlamaIndexDocs\", \"WeaviateDocs\"]\n",
    "    )\n",
    "    return agent"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Write Contents of Webpage to the Collections\n",
    "\n",
    "The helper function below uses the `SimpleWebPageReader` to write the contents of a webpage to the relevant Weaviate collection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def write_webpages_to_weaviate(client, urls: list[str], collection_name: str):\n",
    "    documents = SimpleWebPageReader(html_to_text=True).load_data(urls)\n",
    "    collection = client.collections.get(collection_name)\n",
    "    with collection.batch.dynamic() as batch:\n",
    "        for doc in documents:\n",
    "            batch.add_object(properties={\"url\": doc.id_, \"text\": doc.text})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a Function Calling Agent\n",
    "\n",
    "Now that we have the relevant functions to write to a collection and also the `QueryAgent` at hand, we can start by using the `FunctionAgent`, which is a simple tool calling agent."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if \"OPENAI_API_KEY\" not in os.environ:\n",
    "    os.environ[\"OPENAI_API_KEY\"] = getpass(\"openai-key\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "weaviate_agent = fresh_setup_weaviate(client)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "llm = OpenAI(model=\"gpt-4o-mini\")\n",
    "\n",
    "\n",
    "def write_to_weaviate_collection(urls=list[str]):\n",
    "    \"\"\"Useful for writing new content to the WeaviateDocs collection\"\"\"\n",
    "    write_webpages_to_weaviate(client, urls, \"WeaviateDocs\")\n",
    "\n",
    "\n",
    "def write_to_li_collection(urls=list[str]):\n",
    "    \"\"\"Useful for writing new content to the LlamaIndexDocs collection\"\"\"\n",
    "    write_webpages_to_weaviate(client, urls, \"LlamaIndexDocs\")\n",
    "\n",
    "\n",
    "def query_agent(query: str) -> str:\n",
    "    \"\"\"Useful for asking questions about Weaviate and LlamaIndex\"\"\"\n",
    "    response = weaviate_agent.run(query)\n",
    "    return response.final_answer\n",
    "\n",
    "\n",
    "agent = FunctionAgent(\n",
    "    tools=[write_to_weaviate_collection, write_to_li_collection, query_agent],\n",
    "    llm=llm,\n",
    "    system_prompt=\"\"\"You are a helpful assistant that can write the\n",
    "      contents of urls to WeaviateDocs and LlamaIndexDocs collections,\n",
    "      as well as forwarding questions to a QueryAgent\"\"\",\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = await agent.run(\n",
    "    user_msg=\"Can you save https://docs.llamaindex.ai/en/stable/examples/agent/agent_workflow_basic/\"\n",
    ")\n",
    "print(str(response))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Llama Index workflows refer to orchestrations involving one or more AI agents within the LlamaIndex framework. These workflows manage complex tasks dynamically by leveraging components such as large language models (LLMs), tools, and memory states. Key features of Llama Index workflows include:\n",
      "\n",
      "- Support for single or multiple agents managed within an AgentWorkflow orchestrator.\n",
      "- Ability to maintain state across runs via serializable context objects.\n",
      "- Integration of external tools with type annotations, including asynchronous functions.\n",
      "- Streaming of intermediate outputs and event-based interactions.\n",
      "- Human-in-the-loop capabilities to confirm or guide agent actions during workflow execution.\n",
      "\n",
      "These workflows enable agents to execute sequences of operations, call external tools asynchronously, maintain conversation or task states, stream partial results, and incorporate human inputs when necessary. They embody dynamic, agent-driven sequences of task decomposition, tool use, and reflection, allowing AI systems to plan, act, and improve iteratively toward specific goals.\n",
      "\n",
      "I have also saved the contents from the provided URLs to the WeaviateDocs collection.\n"
     ]
    }
   ],
   "source": [
    "response = await agent.run(\n",
    "    user_msg=\"\"\"What are llama index workflows? And can you save\n",
    "    these to weaviate docs: https://weaviate.io/blog/what-are-agentic-workflows\n",
    "    and https://weaviate.io/blog/ai-agents\"\"\"\n",
    ")\n",
    "print(str(response))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "You have a total of 2 documents in the WeaviateDocs collection and 1 document in the LlamaIndexDocs collection. In total, that makes 3 documents across both collections.\n"
     ]
    }
   ],
   "source": [
    "response = await agent.run(\n",
    "    user_msg=\"How many docs do I have in the weaviate and llamaindex collections in total?\"\n",
    ")\n",
    "print(str(response))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "weaviate_agent = fresh_setup_weaviate(client)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a Workflow with Branches"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Simple Example: Create Events\n",
    "\n",
    "A LlamaIndex Workflow has 2 fundamentals:\n",
    "- An Event\n",
    "- A Step\n",
    "\n",
    "An step may return an event, and an event may trigger a step!\n",
    "\n",
    "For our use-case, we can imagine thet there are 4 events:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class EvaluateQuery(Event):\n",
    "    query: str\n",
    "\n",
    "\n",
    "class WriteLlamaIndexDocsEvent(Event):\n",
    "    urls: list[str]\n",
    "\n",
    "\n",
    "class WriteWeaviateDocsEvent(Event):\n",
    "    urls: list[str]\n",
    "\n",
    "\n",
    "class QueryAgentEvent(Event):\n",
    "    query: str"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Simple Example: A Branching Workflow (that does nothing yet)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class DocsAssistantWorkflow(Workflow):\n",
    "    @step\n",
    "    async def start(self, ctx: Context, ev: StartEvent) -> EvaluateQuery:\n",
    "        return EvaluateQuery(query=ev.query)\n",
    "\n",
    "    @step\n",
    "    async def evaluate_query(\n",
    "        self, ctx: Context, ev: EvaluateQuery\n",
    "    ) -> QueryAgentEvent | WriteLlamaIndexDocsEvent | WriteWeaviateDocsEvent | StopEvent:\n",
    "        if ev.query == \"llama\":\n",
    "            return WriteLlamaIndexDocsEvent(urls=[ev.query])\n",
    "        if ev.query == \"weaviate\":\n",
    "            return WriteWeaviateDocsEvent(urls=[ev.query])\n",
    "        if ev.query == \"question\":\n",
    "            return QueryAgentEvent(query=ev.query)\n",
    "        return StopEvent()\n",
    "\n",
    "    @step\n",
    "    async def write_li_docs(\n",
    "        self, ctx: Context, ev: WriteLlamaIndexDocsEvent\n",
    "    ) -> StopEvent:\n",
    "        print(f\"Got a request to write something to LlamaIndexDocs\")\n",
    "        return StopEvent()\n",
    "\n",
    "    @step\n",
    "    async def write_weaviate_docs(\n",
    "        self, ctx: Context, ev: WriteWeaviateDocsEvent\n",
    "    ) -> StopEvent:\n",
    "        print(f\"Got a request to write something to WeaviateDocs\")\n",
    "        return StopEvent()\n",
    "\n",
    "    @step\n",
    "    async def query_agent(\n",
    "        self, ctx: Context, ev: QueryAgentEvent\n",
    "    ) -> StopEvent:\n",
    "        print(f\"Got a request to forward a query to the QueryAgent\")\n",
    "        return StopEvent()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "workflow_that_does_nothing = DocsAssistantWorkflow()\n",
    "\n",
    "# draw_all_possible_flows(workflow_that_does_nothing)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Got a request to write something to LlamaIndexDocs\n",
      "None\n"
     ]
    }
   ],
   "source": [
    "print(\n",
    "    await workflow_that_does_nothing.run(start_event=StartEvent(query=\"llama\"))\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Classify the Query with Structured Outputs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class SaveToLlamaIndexDocs(BaseModel):\n",
    "    \"\"\"The URLs to parse and save into a llama-index specific docs collection.\"\"\"\n",
    "\n",
    "    llama_index_urls: List[str] = Field(default_factory=list)\n",
    "\n",
    "\n",
    "class SaveToWeaviateDocs(BaseModel):\n",
    "    \"\"\"The URLs to parse and save into a weaviate specific docs collection.\"\"\"\n",
    "\n",
    "    weaviate_urls: List[str] = Field(default_factory=list)\n",
    "\n",
    "\n",
    "class Ask(BaseModel):\n",
    "    \"\"\"The natural language questions that can be asked to a Q&A agent.\"\"\"\n",
    "\n",
    "    queries: List[str] = Field(default_factory=list)\n",
    "\n",
    "\n",
    "class Actions(BaseModel):\n",
    "    \"\"\"Actions to take based on the latest user message.\"\"\"\n",
    "\n",
    "    actions: List[\n",
    "        Union[SaveToLlamaIndexDocs, SaveToWeaviateDocs, Ask]\n",
    "    ] = Field(default_factory=list)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create a Workflow\n",
    "\n",
    "Let's create a workflow that, still, does nothing, but the incoming user query will be converted to our structure. Based on the contents of that structure, the workflow will decide which step to run.\n",
    "\n",
    "Notice how whichever step runs first, will return a `StopEvent`... This is good, but maybe we can improve that later!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.llms.openai import OpenAIResponses\n",
    "\n",
    "\n",
    "class DocsAssistantWorkflow(Workflow):\n",
    "    def __init__(self, *args, **kwargs):\n",
    "        self.llm = OpenAIResponses(model=\"gpt-4.1-mini\")\n",
    "        self.system_prompt = \"\"\"You are a docs assistant. You evaluate incoming queries and break them down to subqueries when needed.\n",
    "                          You decide on the next best course of action. Overall, here are the options:\n",
    "                          - You can write the contents of a URL to llamaindex docs (if it's a llamaindex url)\n",
    "                          - You can write the contents of a URL to weaviate docs (if it's a weaviate url)\n",
    "                          - You can answer a question about llamaindex and weaviate using the QueryAgent\"\"\"\n",
    "        super().__init__(*args, **kwargs)\n",
    "\n",
    "    @step\n",
    "    async def start(self, ev: StartEvent) -> EvaluateQuery:\n",
    "        return EvaluateQuery(query=ev.query)\n",
    "\n",
    "    @step\n",
    "    async def evaluate_query(\n",
    "        self, ev: EvaluateQuery\n",
    "    ) -> QueryAgentEvent | WriteLlamaIndexDocsEvent | WriteWeaviateDocsEvent:\n",
    "        sllm = self.llm.as_structured_llm(Actions)\n",
    "        response = await sllm.achat(\n",
    "            [\n",
    "                ChatMessage(role=\"system\", content=self.system_prompt),\n",
    "                ChatMessage(role=\"user\", content=ev.query),\n",
    "            ]\n",
    "        )\n",
    "        actions = response.raw.actions\n",
    "        print(actions)\n",
    "        for action in actions:\n",
    "            if isinstance(action, SaveToLlamaIndexDocs):\n",
    "                return WriteLlamaIndexDocsEvent(urls=action.llama_index_urls)\n",
    "            elif isinstance(action, SaveToWeaviateDocs):\n",
    "                return WriteWeaviateDocsEvent(urls=action.weaviate_urls)\n",
    "            elif isinstance(action, Ask):\n",
    "                for query in action.queries:\n",
    "                    return QueryAgentEvent(query=query)\n",
    "\n",
    "    @step\n",
    "    async def write_li_docs(self, ev: WriteLlamaIndexDocsEvent) -> StopEvent:\n",
    "        print(f\"Writing {ev.urls} to LlamaIndex Docs\")\n",
    "        return StopEvent()\n",
    "\n",
    "    @step\n",
    "    async def write_weaviate_docs(\n",
    "        self, ev: WriteWeaviateDocsEvent\n",
    "    ) -> StopEvent:\n",
    "        print(f\"Writing {ev.urls} to Weaviate Docs\")\n",
    "        return StopEvent()\n",
    "\n",
    "    @step\n",
    "    async def query_agent(self, ev: QueryAgentEvent) -> StopEvent:\n",
    "        print(f\"Sending `'{ev.query}`' to agent\")\n",
    "        return StopEvent()\n",
    "\n",
    "\n",
    "everything_docs_agent_beta = DocsAssistantWorkflow()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def run_docs_agent_beta(query: str):\n",
    "    print(\n",
    "        await everything_docs_agent_beta.run(\n",
    "            start_event=StartEvent(query=query)\n",
    "        )\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[SaveToLlamaIndexDocs(llama_index_urls=['https://www.llamaindex.ai/blog/get-citations-and-reasoning-for-extracted-data-in-llamaextract', 'https://www.llamaindex.ai/blog/llamaparse-update-may-2025-new-models-skew-detection-and-more'])]\n",
      "Writing ['https://www.llamaindex.ai/blog/get-citations-and-reasoning-for-extracted-data-in-llamaextract', 'https://www.llamaindex.ai/blog/llamaparse-update-may-2025-new-models-skew-detection-and-more'] to LlamaIndex Docs\n",
      "None\n"
     ]
    }
   ],
   "source": [
    "await run_docs_agent_beta(\n",
    "    \"\"\"Can you save https://www.llamaindex.ai/blog/get-citations-and-reasoning-for-extracted-data-in-llamaextract\n",
    "    and https://www.llamaindex.ai/blog/llamaparse-update-may-2025-new-models-skew-detection-and-more??\"\"\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Ask(queries=['How many documents are in the LlamaIndexDocs collection?'])]\n",
      "Sending `'How many documents are in the LlamaIndexDocs collection?`' to agent\n",
      "None\n"
     ]
    }
   ],
   "source": [
    "await run_docs_agent_beta(\n",
    "    \"How many documents do we have in the LlamaIndexDocs collection now?\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Ask(queries=['What are LlamaIndex workflows?'])]\n",
      "Sending `'What are LlamaIndex workflows?`' to agent\n",
      "None\n"
     ]
    }
   ],
   "source": [
    "await run_docs_agent_beta(\"What are LlamaIndex workflows?\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[SaveToWeaviateDocs(weaviate_urls=['https://weaviate.io/blog/graph-rag', 'https://weaviate.io/blog/genai-apps-with-weaviate-and-databricks'])]\n",
      "Writing ['https://weaviate.io/blog/graph-rag', 'https://weaviate.io/blog/genai-apps-with-weaviate-and-databricks'] to Weaviate Docs\n",
      "None\n"
     ]
    }
   ],
   "source": [
    "await run_docs_agent_beta(\n",
    "    \"Can you save https://weaviate.io/blog/graph-rag and https://weaviate.io/blog/genai-apps-with-weaviate-and-databricks??\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run Multiple Branches & Put it all togehter\n",
    "\n",
    "In these cases, it makes sense to run multiple branches. So, a single step can trigger multiple events at once! We can `send_event` via the context 👇"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class ActionCompleted(Event):\n",
    "    result: str\n",
    "\n",
    "\n",
    "class DocsAssistantWorkflow(Workflow):\n",
    "    def __init__(self, *args, **kwargs):\n",
    "        self.llm = OpenAIResponses(model=\"gpt-4.1-mini\")\n",
    "        self.system_prompt = \"\"\"You are a docs assistant. You evaluate incoming queries and break them down to subqueries when needed.\n",
    "                      You decide on the next best course of action. Overall, here are the options:\n",
    "                      - You can write the contents of a URL to llamaindex docs (if it's a llamaindex url)\n",
    "                      - You can write the contents of a URL to weaviate docs (if it's a weaviate url)\n",
    "                      - You can answer a question about llamaindex and weaviate using the QueryAgent\"\"\"\n",
    "        super().__init__(*args, **kwargs)\n",
    "\n",
    "    @step\n",
    "    async def start(self, ctx: Context, ev: StartEvent) -> EvaluateQuery:\n",
    "        return EvaluateQuery(query=ev.query)\n",
    "\n",
    "    @step\n",
    "    async def evaluate_query(\n",
    "        self, ctx: Context, ev: EvaluateQuery\n",
    "    ) -> QueryAgentEvent | WriteLlamaIndexDocsEvent | WriteWeaviateDocsEvent | None:\n",
    "        await ctx.store.set(\"results\", [])\n",
    "        sllm = self.llm.as_structured_llm(Actions)\n",
    "        response = await sllm.achat(\n",
    "            [\n",
    "                ChatMessage(role=\"system\", content=self.system_prompt),\n",
    "                ChatMessage(role=\"user\", content=ev.query),\n",
    "            ]\n",
    "        )\n",
    "        actions = response.raw.actions\n",
    "        await ctx.store.set(\"num_events\", len(actions))\n",
    "        await ctx.store.set(\"results\", [])\n",
    "        print(actions)\n",
    "        for action in actions:\n",
    "            if isinstance(action, SaveToLlamaIndexDocs):\n",
    "                ctx.send_event(\n",
    "                    WriteLlamaIndexDocsEvent(urls=action.llama_index_urls)\n",
    "                )\n",
    "            elif isinstance(action, SaveToWeaviateDocs):\n",
    "                ctx.send_event(\n",
    "                    WriteWeaviateDocsEvent(urls=action.weaviate_urls)\n",
    "                )\n",
    "            elif isinstance(action, Ask):\n",
    "                for query in action.queries:\n",
    "                    ctx.send_event(QueryAgentEvent(query=query))\n",
    "\n",
    "    @step\n",
    "    async def write_li_docs(\n",
    "        self, ctx: Context, ev: WriteLlamaIndexDocsEvent\n",
    "    ) -> ActionCompleted:\n",
    "        print(f\"Writing {ev.urls} to LlamaIndex Docs\")\n",
    "        write_webpages_to_weaviate(\n",
    "            client, urls=ev.urls, collection_name=\"LlamaIndexDocs\"\n",
    "        )\n",
    "        results = await ctx.store.get(\"results\")\n",
    "        results.append(f\"Wrote {ev.urls} it LlamaIndex Docs\")\n",
    "        return ActionCompleted(result=f\"Writing {ev.urls} to LlamaIndex Docs\")\n",
    "\n",
    "    @step\n",
    "    async def write_weaviate_docs(\n",
    "        self, ctx: Context, ev: WriteWeaviateDocsEvent\n",
    "    ) -> ActionCompleted:\n",
    "        print(f\"Writing {ev.urls} to Weaviate Docs\")\n",
    "        write_webpages_to_weaviate(\n",
    "            client, urls=ev.urls, collection_name=\"WeaviateDocs\"\n",
    "        )\n",
    "        results = await ctx.store.get(\"results\")\n",
    "        results.append(f\"Wrote {ev.urls} it Weavite Docs\")\n",
    "        return ActionCompleted(result=f\"Writing {ev.urls} to Weaviate Docs\")\n",
    "\n",
    "    @step\n",
    "    async def query_agent(\n",
    "        self, ctx: Context, ev: QueryAgentEvent\n",
    "    ) -> ActionCompleted:\n",
    "        print(f\"Sending {ev.query} to agent\")\n",
    "        response = weaviate_agent.run(ev.query)\n",
    "        results = await ctx.store.get(\"results\")\n",
    "        results.append(f\"QueryAgent responded with:\\n {response.final_answer}\")\n",
    "        return ActionCompleted(result=f\"Sending `'{ev.query}`' to agent\")\n",
    "\n",
    "    @step\n",
    "    async def collect(\n",
    "        self, ctx: Context, ev: ActionCompleted\n",
    "    ) -> StopEvent | None:\n",
    "        num_events = await ctx.store.get(\"num_events\")\n",
    "        evs = ctx.collect_events(ev, [ActionCompleted] * num_events)\n",
    "        if evs is None:\n",
    "            return None\n",
    "        return StopEvent(result=[ev.result for ev in evs])\n",
    "\n",
    "\n",
    "everything_docs_agent = DocsAssistantWorkflow(timeout=None)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def run_docs_agent(query: str):\n",
    "    handler = everything_docs_agent.run(start_event=StartEvent(query=query))\n",
    "    result = await handler\n",
    "    for response in await handler.ctx.store.get(\"results\"):\n",
    "        print(response)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[SaveToLlamaIndexDocs(llama_index_urls=['https://docs.llamaindex.ai/en/stable/understanding/workflows/']), SaveToLlamaIndexDocs(llama_index_urls=['https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/'])]\n",
      "Writing ['https://docs.llamaindex.ai/en/stable/understanding/workflows/'] to LlamaIndex Docs\n",
      "Writing ['https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/'] to LlamaIndex Docs\n",
      "Wrote ['https://docs.llamaindex.ai/en/stable/understanding/workflows/'] it LlamaIndex Docs\n",
      "Wrote ['https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/'] it LlamaIndex Docs\n"
     ]
    }
   ],
   "source": [
    "await run_docs_agent(\n",
    "    \"Can you save https://docs.llamaindex.ai/en/stable/understanding/workflows/ and https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Ask(queries=['How many documents are in the LlamaIndexDocs collection?'])]\n",
      "Sending How many documents are in the LlamaIndexDocs collection? to agent\n",
      "QueryAgent responded with:\n",
      " The LlamaIndexDocs collection contains 2 documents, specifically related to workflows and branches and loops within the documentation.\n"
     ]
    }
   ],
   "source": [
    "await run_docs_agent(\n",
    "    \"How many documents do we have in the LlamaIndexDocs collection now?\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Ask(queries=['What are LlamaIndex workflows?'])]\n",
      "Sending What are LlamaIndex workflows? to agent\n",
      "QueryAgent responded with:\n",
      " LlamaIndex workflows are an event-driven, step-based framework designed to control and manage the execution flow of complex applications, particularly those involving generative AI. They break an application into discrete Steps, each triggered by Events and capable of emitting further Events, allowing for complex logic involving loops, branches, and parallel execution.\n",
      "\n",
      "In a LlamaIndex workflow, steps perform functions ranging from simple tasks to complex agents, with inputs and outputs communicated via Events. This event-driven model facilitates maintainability and clarity, overcoming limitations of previous approaches like directed acyclic graphs (DAGs) which struggled with complex flows involving loops and branching.\n",
      "\n",
      "Key features include:\n",
      "- **Loops:** Steps can return events that loop back to previous steps to enable iterative processes.\n",
      "- **Branches:** Workflows can branch into different paths based on conditions, allowing for multiple distinct sequences of steps.\n",
      "- **Parallelism:** Multiple branches or steps can run concurrently and synchronize their results.\n",
      "- **State Maintenance:** Workflows support maintaining state and context throughout execution.\n",
      "- **Observability and Debugging:** Supported by various components and callbacks for monitoring.\n",
      "\n",
      "An example workflow might involve judging whether a query is of sufficient quality, looping to improve it if not, then concurrently executing different retrieval-augmented generation (RAG) strategies, and finally judging their responses to produce a single output.\n",
      "\n",
      "Workflows are especially useful as applications grow in complexity, enabling developers to organize and control intricate AI logic more naturally and efficiently than traditional graph-based methods. For simpler pipelines, LlamaIndex suggests using workflows optionally, but for advanced agentic applications, workflows provide a flexible and powerful control abstraction.\n"
     ]
    }
   ],
   "source": [
    "await run_docs_agent(\n",
    "    \"What are LlamaIndex workflows? And can you save https://weaviate.io/blog/graph-rag\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Ask(queries=['How to use loops in llamaindex workflows'])]\n",
      "Sending How to use loops in llamaindex workflows to agent\n",
      "QueryAgent responded with:\n",
      " In LlamaIndex workflows, loops are implemented using an event-driven approach where you define custom event types and steps that emit events to control the workflow's execution flow. To create a loop, you define a custom event (e.g., `LoopEvent`) and a workflow step that can return either the event continuing the loop or another event to proceed. For example, a workflow step might randomly decide to either loop back (emit `LoopEvent` again) or continue to a next step emitting a different event.\n",
      "\n",
      "This allows creating flexible looping behaviors where any step can loop back to any other step by returning the corresponding event instances. The approach leverages Python's async functions decorated with `@step`, which process events and return the next event(s), enabling both loops and conditional branching in workflows.\n",
      "\n",
      "Thus, loops in LlamaIndex workflows are event-based, using custom event types and the return of events from steps to signal iterations until a condition is met.\n",
      "\n",
      "Example:\n",
      "\n",
      "```python\n",
      "from llamaindex.workflow import Workflow, Event, StartEvent, StopEvent, step\n",
      "import random\n",
      "\n",
      "class LoopEvent(Event):\n",
      "    loop_output: str\n",
      "\n",
      "class FirstEvent(Event):\n",
      "    first_output: str\n",
      "\n",
      "class MyWorkflow(Workflow):\n",
      "    @step\n",
      "    async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:\n",
      "        if random.randint(0, 1) == 0:\n",
      "            print(\"Bad thing happened\")\n",
      "            return LoopEvent(loop_output=\"Back to step one.\")\n",
      "        else:\n",
      "            print(\"Good thing happened\")\n",
      "            return FirstEvent(first_output=\"First step complete.\")\n",
      "\n",
      "    # ... other steps ...\n",
      "\n",
      "# Running this workflow will cause step_one to loop randomly until it proceeds.\n",
      "```\n",
      "\n",
      "You can combine loops with branching and parallel execution in workflows to build complex control flows. For detailed guidance and examples, consult the LlamaIndex documentation under \"Branches and Loops\" and the \"Workflows\" guides.\n"
     ]
    }
   ],
   "source": [
    "await run_docs_agent(\"How do I use loops in llamaindex workflows?\")"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [
    "rurysjPlqCae",
    "wt0MVTTkrR4e",
    "eHQHV4szr1WX",
    "pQOukan2wQZ0",
    "GDtnnETPzsAC"
   ],
   "provenance": []
  },
  "kernelspec": {
   "display_name": "Python 3",
   "name": "python3"
  },
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
